[SPARK-55751][SS] Add metrics on state store loads from cloud storage#54567
[SPARK-55751][SS] Add metrics on state store loads from cloud storage#54567gnanda wants to merge 3 commits intoapache:masterfrom
Conversation
761843b to
b36d880
Compare
b36d880 to
910b000
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
65f059f to
22cd93d
Compare
|
Can we add a test in the Auto Snapshot Repair Suite, where this metric would be > 1? |
|
ericm-db
left a comment
There was a problem hiding this comment.
lgtm, thanks for adding the test
| @volatile private var performedSnapshotAutoRepair = false | ||
|
|
||
| // Number of DFS (cloud) fetches performed when loading the current version | ||
| @volatile private var numCloudLoads = 0L |
There was a problem hiding this comment.
This should never be greater than 1 for a given store instance. Hence can be a boolean. For a store if loaded from DFS then should be 1, if not, 0. Just like the performedSnapshotAutoRepair above.
Also lets avoid referring to this as cloud, you would see that existing code calls it DFS. Because spark can run using storage that isn't cloud.
| val metadata = fileManager.loadCheckpointFromDfs(0, workingDir, rocksDBFileMapping, None) | ||
| val metadata = fetchCheckpointFromDfs(0) | ||
| // No real snapshot exists at this version; advance loadedVersion to the target | ||
| // so the next commit produces version + 1 rather than 1. |
There was a problem hiding this comment.
please lets remove this comment. It isn't correct
| * Fetches a snapshot from DFS, sets [[loadedVersion]] to the snapshot version, | ||
| * and increments [[numCloudLoads]]. Returns the checkpoint metadata. | ||
| * Callers are responsible for calling [[openLocalRocksDB]], setting | ||
| * [[lastSnapshotVersion]], and any load-path-specific [[loadedVersion]] overrides. |
There was a problem hiding this comment.
please lets remove these 2 lines of comment. It is not essential for this func
| * Callers are responsible for calling [[openLocalRocksDB]], setting | ||
| * [[lastSnapshotVersion]], and any load-path-specific [[loadedVersion]] overrides. | ||
| */ | ||
| private def fetchCheckpointFromDfs( |
| numCloudLoads += 1 | ||
| val metadata = fileManager.loadCheckpointFromDfs( | ||
| snapshotVersion, workingDir, rocksDBFileMapping, uniqueId) | ||
| loadedVersion = snapshotVersion |
There was a problem hiding this comment.
Honestly I wouldn't add this extra function, makes the side effect even more difficult to reason about from the calling function. Just set the load flag that you are introducing at the source. It is a one-line change.
| "rocksdbNumReplayChangelogFiles", "rocksdbForceSnapshotCount")) | ||
| "rocksdbNumReplayChangelogFiles", "rocksdbForceSnapshotCount", | ||
| "rocksdbLoadedFromCloud")) | ||
| assert(stateOperatorMetrics.customMetrics.get("rocksdbNumSnapshotsAutoRepaired") == 0, |
There was a problem hiding this comment.
lets verify the metric value too
|
|
||
| val m1 = db.metricsOpt.get | ||
| assert(m1.loadMetrics("load") > 0) | ||
| assert(m1.loadMetrics("numCloudLoads") === 1) |
There was a problem hiding this comment.
i don't see any other test where you are also verifying load is 0
| db.load(1) | ||
| db.put("b", "10") | ||
| db.commit() | ||
| db.doMaintenance() |
| db.commit() // a new snapshot (5.zip) will be created since previous one is corrupt | ||
| assert(db.metricsOpt.get.numSnapshotsAutoRepaired == 1) | ||
| // 4.zip was tried and failed (1 load), then 2.zip succeeded (2 loads) | ||
| assert(db.metricsOpt.get.loadMetrics("numCloudLoads") === 2) |
| db.commit() | ||
| assert(db.metricsOpt.get.numSnapshotsAutoRepaired == 1) | ||
| // 5.zip failed (1), 4.zip failed (2), 2.zip failed (3), then version 0 succeeded (4) | ||
| assert(db.metricsOpt.get.loadMetrics("numCloudLoads") === 4) |
What changes were proposed in this pull request?
This PR adds a new metric, rocksdbLoadedFromCloud, that tracks how many times the RocksDB state store fetched a snapshot from remote (cloud/DFS) storage during a single load operation.
The implementation introduces a fetchCheckpointFromDfs helper in RocksDB that centralizes all fileManager.loadCheckpointFromDfs(...) call sites. Each call through this helper increments a per-load numCloudLoads counter, which is then emitted
via loadMetrics (under both the load and loadFromSnapshot paths). The counter is reset at the start of each load.
The metric is surfaced as a StateStoreCustomSumMetric named rocksdbLoadedFromCloud and included in the full list of custom metrics reported by RocksDBStateStoreProvider.
Why are the changes needed?
Fetching state from cloud storage is significantly more expensive than a local cache hit. Without this metric, there is no way to distinguish a load that was fully served from local disk from one that required one or more round-trips to cloud
storage. This metric gives operators and developers visibility into cloud load frequency, which is useful for diagnosing performance regressions, tuning snapshot and changelog checkpointing configuration, and understanding cost implications of
state store operations.
Does this PR introduce any user-facing change?
Yes. A new custom metric rocksdbLoadedFromCloud ("RocksDB: load - number of times state was loaded from cloud storage") is now reported in the Structured Streaming progress reporter under stateOperatorMetrics.customMetrics.
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code 2.1.58